package kafka.admin;

import kafka.admin.ConsumerGroupCommand;
import scala.collection.JavaConversions;

import java.util.List;

public class ZKConsumerService implements ConsumerService {

    private String zookeeperServers;

    public ZKConsumerService(String zookeeperServers) {
        this.zookeeperServers = zookeeperServers;
    }

    @Override
    public List<String> listConsumerGroups() {
        String[] options = new String[]{
                "--zookeeper", zookeeperServers,
                "--list"
        };
        ConsumerGroupCommand.ConsumerGroupCommandOptions consumerGroupCommandOptions = new ConsumerGroupCommand.ConsumerGroupCommandOptions(options);
        ConsumerGroupCommand.ZkConsumerGroupService zkConsumerGroupService = new ConsumerGroupCommand.ZkConsumerGroupService(consumerGroupCommandOptions);
        scala.collection.immutable.List<String> stringList1 = zkConsumerGroupService.listGroups();
        zkConsumerGroupService.close();
        return JavaConversions.seqAsJavaList(stringList1);
    }
}
